* Use posix_fadvise() on the buffer file. (@Svedrin)
- * Refactor buffer and cleanup alternative buffer path.
\ No newline at end of file
+ * The buffer size can now be adjusted by using the database.conf
+ configuration file.
+
\ No newline at end of file
int siridb_buffer_write_empty(
siridb_buffer_t * buffer,
siridb_series_t * series);
-int siridb_buffer_write_last_point(
+int siridb_buffer_write_point(
siridb_buffer_t * buffer,
- siridb_series_t * series);
+ siridb_series_t * series,
+ uint64_t * ts,
+ qp_via_t * val);
struct siridb_buffer_s
{
siridb_points_t * siridb_series_get_last(
siridb_series_t * series, int * required_shard);
siridb_points_t * siridb_series_get_count(siridb_series_t * series);
+void siridb_series_ensure_type(siridb_series_t * series, qp_obj_t * qp_obj);
/*
* Increment the series reference counter.
*/
from test_log import TestLog
from test_log import TestLog
from test_pipe_support import TestPipeSupport
+from test_buffer import TestBuffer
Server.BUILDTYPE = 'Release'
run_test(TestUser())
run_test(TestLog())
run_test(TestPipeSupport())
+ run_test(TestBuffer())
_interval_range = None
_r = None
- def __init__(self, r, allowed_kinds=(int, float, str)):
+ def __init__(self, r, allowed_kinds=(int, float, str), wrong_type=False):
self.kind = r.choice(allowed_kinds)
self.lval = {
str: '',
self.likely_equal = r.choice([0.01, 0.1, 0.2, 0.5, 0.99])
self.likely_change_sign = r.choice([0.0, 0.1, 0.25, 0.5, 0.9])
- self.as_int = self.kind == float and r.random() > 0.9
+ self.as_int = wrong_type and self.kind == float and r.random() > 0.9
self.likely_inf = r.random() * 0.2 \
if self.kind == float and r.random() > 0.95 else False
self.likely_nan = r.random() * 0.2 \
if self.kind == float and r.random() > 0.95 else False
- self.gen_float = self.kind == int and r.random() > 0.97
+ self.gen_float = wrong_type and self.kind == int and r.random() > 0.97
self.name = self._gen_name()
Series._series.append(self)
kinds = [translate[k] for k in args.kinds]
for i in range(args.num_series):
- Series(r=series_rand, allowed_kinds=kinds)
+ Series(
+ r=series_rand,
+ allowed_kinds=kinds,
+ wrong_type=args.wrong_type)
def _gen_name(self):
name = '/n:{}/range:{},{}/eq:{}/cs:{}/opt:{}{}{}{}{}'.format(
default=('int', 'float'),
choices=('int', 'float')) # , 'str'
+ parser.add_argument(
+ '--wrong-type',
+ action='store_true',
+ help='Allow series to insert points using a wrong type')
+
parser.add_argument(
'--max-parallel',
type=int,
--- /dev/null
+import os
+import asyncio
+import functools
+import random
+import time
+from testing import Client
+from testing import default_test_setup
+from testing import gen_data
+from testing import gen_points
+from testing import gen_series
+from testing import InsertError
+from testing import PoolError
+from testing import QueryError
+from testing import run_test
+from testing import Series
+from testing import Server
+from testing import ServerError
+from testing import SiriDB
+from testing import TestBase
+from testing import UserAuthError
+
+
+class TestBuffer(TestBase):
+ title = 'Test buffer object'
+
+ async def _add_points(self):
+ for series_name in ['iris', 'db', 'ligo', 'sasha']:
+ if series_name not in self.total:
+ self.total[series_name] = []
+ batches = sum([ord(c) for c in series_name]) % 100
+ for i in range(batches):
+ npoints = []
+ n = int(i**0.5 * 10000 % 5) + 1
+ for p in range(n):
+ self.ts += (n + 5000) if i % 2 else (n - 5000)
+ npoints.append([self.ts, i*1000+p])
+ self.total[series_name].extend(npoints)
+ self.total[series_name].sort()
+ await self.client0.insert({series_name: npoints})
+
+ async def _test_equal(self):
+ for series_name, points in self.total.items():
+ res = await self.client0.query(f'select * from "{series_name}"')
+ res = res[series_name]
+ self.assertEqual(len(points), len(res))
+ self.assertEqual(points, res)
+
+ async def _change_buf_size(self, buffer_size):
+ self.client0.close()
+ result = await self.server0.stop()
+ self.assertTrue(result)
+ self.server0.set_buffer_size(self.db, buffer_size)
+ await self.server0.start(sleep=5)
+ await self.client0.connect()
+ res = await self.client0.query('show buffer_size')
+ self.assertEqual(res['data'][0]['value'], buffer_size)
+ await self._test_equal()
+
+ async def _change_buf_path(self, buffer_path):
+ self.client0.close()
+ result = await self.server0.stop()
+ self.assertTrue(result)
+ self.server0.set_buffer_path(self.db, buffer_path)
+ await self.server0.start(sleep=5)
+ await self.client0.connect()
+ res = await self.client0.query('show buffer_path')
+ self.assertEqual(res['data'][0]['value'], buffer_path)
+ res = await self.client0.query('show open_files')
+ self.assertEqual(res['data'][0]['value'], 3)
+ res = await self.client0.query(
+ f'alter server {self.uuid} set backup_mode true')
+ await asyncio.sleep(5)
+ res = await self.client0.query('show open_files')
+ self.assertEqual(res['data'][0]['value'], 0)
+ res = await self.client0.query(
+ f'alter server {self.uuid} set backup_mode false')
+ await self._test_equal()
+
+ @default_test_setup(1, time_precision='s', compression=False)
+ async def run(self):
+ await self.client0.connect()
+
+ res = await self.client0.query('show uuid')
+ self.uuid = res['data'][0]['value']
+
+ self.ts = 1500000000
+ self.total = {}
+
+ await self._add_points()
+ await self._test_equal()
+
+ await self._change_buf_path(os.path.join(
+ self.server0.dbpath,
+ self.db.dbname,
+ '../buf/'))
+
+ await self._change_buf_size(8192)
+
+ await self._add_points()
+ await self._test_equal()
+
+ await self._change_buf_size(8192)
+ await self._change_buf_size(512)
+
+ await self._add_points()
+ await self._test_equal()
+
+ await self._change_buf_size(1024)
+
+ await self._change_buf_path(os.path.join(
+ self.server0.dbpath,
+ self.db.dbname,
+ 'buf/'))
+
+ return False
+
+
+if __name__ == '__main__':
+ SiriDB.LOG_LEVEL = 'INFO'
+ Server.HOLD_TERM = True
+ Server.MEM_CHECK = True
+ Server.BUILDTYPE = 'Debug'
+ run_test(TestBuffer())
class TestSelect(TestBase):
title = 'Test select and aggregate functions'
- @default_test_setup(1, compression=False)
+ @default_test_setup(1, compression=False, buffer_size=1024)
async def run(self):
await self.client0.connect()
Klingon = ' ' + \
'qajunpaQHeylIjmo’ batlh DuSuvqang charghwI’ ‘It.'
+data = {
+ 'string': [
+ [1538660000, "some string value"],
+ [1538660010, -123456789],
+ [1538660020, -0.5],
+ [1538660030, 1/3],
+ ],
+ 'integer': [
+ [1538660000, 1],
+ [1538660010, 35.6],
+ [1538660020, "-50,6%"],
+ [1538660030, ""],
+ ],
+ 'double': [
+ [1538660000, 1.0],
+ [1538660010, -35],
+ [1538660010, "-50,6%"],
+ [1538660030, ""],
+ ]
+}
+
+expected = {
+ 'string': [
+ [1538660000, "some string value"],
+ [1538660010, '-123456789'],
+ [1538660020, '-0,500000'],
+ [1538660030, '0,333333'],
+ ],
+ 'integer': [
+ [1538660000, 1],
+ [1538660010, 35],
+ [1538660020, -50],
+ [1538660030, 0],
+ ],
+ 'double': [
+ [1538660000, 1.0],
+ [1538660010, -35.0],
+ [1538660010, -50.6],
+ [1538660030, 0.0],
+ ]
+}
+
class TestSeries(TestBase):
title = 'Test series object'
await self.client0.query('select * from "{}"'.format(Klingon)),
{Klingon: sorted(points)})
+ self.assertEqual(
+ await self.client0.insert(data),
+ {'success_msg': 'Successfully inserted 12 point(s).'})
+
+ self.assertAlmostEqual(
+ await self.client0.query(
+ 'select * from "string", "integer", "double"'),
+ expected)
+
self.client0.close()
# return False
self.dbpath = os.path.join(TEST_DIR, 'dbpath{}'.format(self.n))
self.name = 'SiriDB:{}'.format(self.listen_backend_port)
self.pid = None
+ self.buffer_path = None
+ self.buffer_size = None
@property
def addr(self):
self.pid = None
return True
+ def set_buffer_size(self, db, buffer_size):
+ self.buffer_size = buffer_size
+ config = configparser.RawConfigParser()
+ config.add_section('buffer')
+ if self.buffer_path is not None:
+ config.set('buffer', 'path', self.buffer_path)
+ config.set('buffer', 'size', self.buffer_size)
+ with open(os.path.join(
+ self.dbpath, db.dbname, 'database.conf'), 'w') as f:
+ config.write(f)
+
+ def set_buffer_path(self, db, buffer_path):
+ assert(buffer_path.endswith('/'))
+ curfile = os.path.join(self.dbpath, db.dbname, 'buffer.dat') \
+ if self.buffer_path is None else \
+ os.path.join(self.buffer_path, 'buffer.dat')
+ if not os.path.exists(buffer_path):
+ os.makedirs(buffer_path)
+ os.rename(curfile, os.path.join(buffer_path, 'buffer.dat'))
+ self.buffer_path = buffer_path
+ config = configparser.RawConfigParser()
+ config.add_section('buffer')
+ config.set('buffer', 'path', self.buffer_path)
+ if self.buffer_size is not None:
+ config.set('buffer', 'size', self.buffer_size)
+ with open(os.path.join(
+ self.dbpath, db.dbname, 'database.conf'), 'w') as f:
+ config.write(f)
+
def kill(self):
print("!!!!!!!!!!!! KILLL !!!!!!!!!!")
os.system('kill -9 {}'.format(self.pid))
assert isinstance(point, list) and len(point) == 2, \
'Expecting a point to be a list of 2 items'
super().assertEqual(a[series][i][0], point[0])
- if math.isnan(a[series][i][1]):
+ if isinstance(a[series][i][1], str):
+ super().assertEqual(a[series][i][1], point[1])
+ elif math.isnan(a[series][i][1]):
assert math.isnan(point[1]), \
'Expecting point `{}` to be `nan`, got: `{}`' \
.format(i, point[1])
*
* Returns 0 if success or EOF in case of an error.
*/
-int siridb_buffer_write_last_point(
+int siridb_buffer_write_point(
siridb_buffer_t * buffer,
- siridb_series_t * series)
+ siridb_series_t * series,
+ uint64_t * ts,
+ qp_via_t * val)
{
- siridb_point_t * point;
const size_t sz = sizeof(uint64_t) + sizeof(qp_via_t);
char buf[sz];
- int last_idx = series->buffer->len - 1;
- assert (last_idx >= 0);
- point = series->buffer->data + last_idx;
-
- memcpy(buf, &point->ts, sizeof(uint64_t));
- memcpy(buf + sizeof(uint64_t), &point->val, sizeof(qp_via_t));
+ ssize_t last_idx = series->buffer->len - 1;
+ assert (last_idx >= 0);
+ memcpy(buf, ts, sizeof(uint64_t));
+ memcpy(buf + sizeof(uint64_t), val, sizeof(qp_via_t));
return (
/* jump to position where to write the new point */
ts = (uint64_t *) &qp_series_ts.via.int64;
SERIES_UPDATE_TS(series)
+ siridb_series_ensure_type(series, &qp_series_val);
if ((tp = qp_next(unpacker, qp_series_name)) != QP_ARRAY2 &&
series->buffer != NULL)
{
if (series->tp == TP_STRING)
{
val = &forstr;
- val->str = qp_is_raw(qp_series_val.tp) ?
- strndup(qp_series_val.via.str, qp_series_val.len) :
- strdup("");
-
+ val->str = strndup(qp_series_val.via.str, qp_series_val.len);
if (val->str == NULL)
{
ERR_ALLOC
{
qp_next(unpacker, &qp_series_ts); /* ts */
qp_next(unpacker, &qp_series_val); /* val */
+ siridb_series_ensure_type(series, &qp_series_val);
if (series->tp == TP_STRING)
{
- val->str = qp_is_raw(qp_series_val.tp) ?
- strndup(qp_series_val.via.str, qp_series_val.len) :
- strdup("");
-
+ val->str = \
+ strndup(qp_series_val.via.str, qp_series_val.len);
if (val->str == NULL)
{
ERR_ALLOC
ts = (uint64_t *) &qp_series_ts.via.int64;
SERIES_UPDATE_TS(series)
+ siridb_series_ensure_type(series, &qp_series_val);
+
if ((tp = qp_next(unpacker, qp_series_name)) != QP_ARRAY2 &&
series->buffer != NULL)
{
if (series->tp == TP_STRING)
{
val = &forstr;
- val->str = qp_is_raw(qp_series_val.tp) ?
- strndup(qp_series_val.via.str, qp_series_val.len) :
- strdup("");
-
+ val->str = strndup(qp_series_val.via.str, qp_series_val.len);
if (val->str == NULL)
{
ERR_ALLOC
{
qp_next(unpacker, &qp_series_ts); /* ts */
qp_next(unpacker, &qp_series_val); /* val */
+ siridb_series_ensure_type(series, &qp_series_val);
if (series->tp == TP_STRING)
{
- val->str = qp_is_raw(qp_series_val.tp) ?
- strndup(qp_series_val.via.str, qp_series_val.len) :
- strdup("");
-
+ val->str = \
+ strndup(qp_series_val.via.str, qp_series_val.len);
if (val->str == NULL)
{
ERR_ALLOC
#define BEND series->buffer->points->data[series->buffer->points->len - 1].ts
#define DROPPED_DUMMY 1
+/*
+ * Used for storing double and integers as string. this is not very important
+ * if it will not store all characters generated so 64 is more than enough
+ */
+#define STR_TYPE_BUF_SZ 64
+static char str_type_buf[STR_TYPE_BUF_SZ];
+
static int SERIES_save(siridb_t * siridb);
static int SERIES_load(siridb_t * siridb, imap_t * dropped);
static int SERIES_read_dropped(siridb_t * siridb, imap_t * dropped);
}
else
{
- if (siridb_buffer_write_last_point(siridb->buffer, series))
+ if (siridb_buffer_write_point(siridb->buffer, series, ts, val))
{
ERR_FILE
log_critical("Cannot write new point to buffer");
return points;
}
+void siridb_series_ensure_type(siridb_series_t * series, qp_obj_t * qp_obj)
+{
+ switch(series->tp)
+ {
+ case TP_INT:
+ if (qp_obj->tp != QP_INT64)
+ {
+ if (qp_obj->tp == QP_DOUBLE)
+ {
+ double d = qp_obj->via.real;
+ qp_obj->via.int64 = (int64_t) d;
+ }
+ else if (qp_obj->tp == QP_RAW)
+ {
+ char * s = strndup(qp_obj->via.str, qp_obj->len);
+ qp_obj->via.int64 = \
+ (s == NULL) ? 0 : (int64_t) strtoll(s, NULL, 10);
+ free(s);
+ }
+ else
+ {
+ assert(0);
+ }
+ qp_obj->tp = QP_INT64;
+ }
+ return;
+ case TP_DOUBLE:
+ if (qp_obj->tp != QP_DOUBLE)
+ {
+ if (qp_obj->tp == QP_INT64)
+ {
+ int64_t i = qp_obj->via.int64;
+ qp_obj->via.real = (double) i;
+ }
+ else if (qp_obj->tp == QP_RAW)
+ {
+ char * s = strndup(qp_obj->via.str, qp_obj->len);
+ qp_obj->via.real = \
+ (s == NULL) ? 0.0 : strtod(s, NULL);
+ free(s);
+ }
+ else
+ {
+ assert(0);
+ }
+ qp_obj->tp = TP_DOUBLE;
+ }
+ return;
+ case TP_STRING:
+ if (qp_obj->tp != QP_RAW)
+ {
+ if (qp_obj->tp == QP_INT64)
+ {
+ snprintf(
+ str_type_buf,
+ STR_TYPE_BUF_SZ,
+ "%" PRId64,
+ qp_obj->via.int64);
+ qp_obj->via.str = str_type_buf;
+ }
+ else if (qp_obj->tp == QP_DOUBLE)
+ {
+ snprintf(
+ str_type_buf,
+ STR_TYPE_BUF_SZ,
+ "%f",
+ qp_obj->via.real);
+ qp_obj->via.str = str_type_buf;
+ }
+ else
+ {
+ assert(0);
+ }
+ qp_obj->tp = QP_RAW;
+ }
+ return;
+ }
+ assert (0);
+}
+
/*
* Calculate the server id.
* Returns 0 or 1, representing a server in a pool)
siridb_series_t * series;
qp_types_t tp;
uint32_t series_id;
+ uint8_t series_tp;
/* we should not have any series at this moment */
assert(siridb->max_series_id == 0);
if (imap_get(dropped, series_id) == NULL)
{
+ series_tp = (uint8_t) qp_series_tp.via.int64;
series = SERIES_new(
siridb,
series_id,
- (uint8_t) qp_series_tp.via.int64,
+ series_tp,
siridb->server->pool,
(const char *) qp_series_name.via.raw);
if (series != NULL)